// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
//! Tools for generating random geometries
//!
//! The current options provided here were built to support basic correctness testing and
//! benchmarking algorithmic complexity of a large number of scalar functions, many of which
//! have various performance or correctness issues that arise from null features, empty features,
//! polygons with holes, or collections with various numbers of sub-geometries.
//! See <https://github.com/apache/sedona/pull/1680> for the Sedona/Java implementation of spider,
//! which implements a number of other strategies for generating various geometry types.

use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader};
use arrow_array::{BinaryArray, BinaryViewArray};
use arrow_array::{Float64Array, Int32Array};
use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
use datafusion_common::Result;
use geo_types::{
    Coord, Geometry, GeometryCollection, LineString, MultiLineString, MultiPoint, MultiPolygon,
    Point, Polygon, Rect,
};
use rand::distributions::Uniform;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use sedona_common::sedona_internal_err;
use sedona_geometry::types::GeometryTypeId;
use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY};
use std::f64::consts::PI;
use std::sync::Arc;

/// Builder for generating test data partitions with random geometries.
///
/// This builder allows you to create deterministic test datasets with configurable
/// geometry types, data distribution, and partitioning for testing spatial operations.
///
/// The generated data includes:
///
/// - `id`: Unique integer identifier for each row
/// - `dist`: Random floating-point distance value (0.0 to 100.0)
/// - `geometry`: Random geometry data in the specified format (WKB or WKB View)
///
/// The strategy for generating geometries and their options are not stable and may change
/// as the needs of testing and benchmarking evolve or better strategies are discovered.
/// The strategy for generating random geometries is as follows:
///
/// - Points are uniformly distributed over the [Self::bounds] indicated
/// - Linestrings are generated by calculating the points in a circle of a randomly
///   chosen size (according to [Self::size_range]) with vertex count sampled using
///   [Self::vertices_per_linestring_range]. The start and end point of generated
///   linestrings are never connected.
/// - Polygons are generated using a closed version of the linestring generated.
///   They may or may not have a hole according to [Self::polygon_hole_rate].
/// - MultiPoint, MultiLinestring, and MultiPolygon geometries are constructed
///   with the number of parts sampled according to [Self::num_parts_range].
///   The size of the entire feature is constrained to [Self::size_range],
///   and this space is subdivided to obtain the exact number of spaces needed.
///   Child features are generated using the global options except with sizes
///   sampled to approach the space given to them.
///
/// # Example
///
/// ```rust
/// use sedona_testing::datagen::RandomPartitionedDataBuilder;
/// use sedona_geometry::types::GeometryTypeId;
/// use geo_types::{Coord, Rect};
///
/// let (schema, partitions) = RandomPartitionedDataBuilder::new()
///     .seed(42)
///     .num_partitions(4)
///     .rows_per_batch(1000)
///     .geometry_type(GeometryTypeId::Polygon)
///     .bounds(Rect::new(Coord { x: 0.0, y: 0.0 }, Coord { x: 100.0, y: 100.0 }))
///     .build()
///     .unwrap();
/// ```
#[derive(Debug, Clone)]
pub struct RandomPartitionedDataBuilder {
    pub seed: u64,
    pub num_partitions: usize,
    pub batches_per_partition: usize,
    pub rows_per_batch: usize,
    sedona_type: SedonaType,
    null_rate: f64,
    options: RandomGeometryOptions,
}

impl Default for RandomPartitionedDataBuilder {
    fn default() -> Self {
        let options = RandomGeometryOptions::new();

        Self {
            seed: 42,
            num_partitions: 1,
            batches_per_partition: 1,
            rows_per_batch: 10,
            sedona_type: WKB_GEOMETRY,
            null_rate: 0.0,
            options,
        }
    }
}

impl RandomPartitionedDataBuilder {
    /// Creates a new `RandomPartitionedDataBuilder` with default values.
    ///
    /// Default configuration:
    ///
    /// - seed: 42 (for deterministic results)
    /// - num_partitions: 1
    /// - batches_per_partition: 1
    /// - rows_per_batch: 10
    /// - geometry_type: Point
    /// - bounds: (0,0) to (100,100)
    /// - size_range: 1.0 to 10.0
    /// - null_rate: 0.0 (no nulls)
    /// - empty_rate: 0.0 (no empties)
    /// - vertices_per_linestring_range
    /// - num_parts_range: 1 to 3
    /// - polygon_hole_rate: 0.0 (no polygons with holes)
    pub fn new() -> Self {
        Self::default()
    }

    /// Sets the random seed for deterministic data generation.
    ///
    /// Using the same seed will produce identical datasets, which is useful
    /// for reproducible tests.
    ///
    /// # Arguments
    ///
    /// * `seed` - The random seed value
    pub fn seed(mut self, seed: u64) -> Self {
        self.seed = seed;
        self
    }

    /// Sets the number of data partitions to generate.
    ///
    /// Each partition contains multiple batches of data. This is useful for
    /// testing distributed processing scenarios.
    ///
    /// # Arguments
    ///
    /// * `num_partitions` - Number of partitions to create
    pub fn num_partitions(mut self, num_partitions: usize) -> Self {
        self.num_partitions = num_partitions;
        self
    }

    /// Sets the number of batches per partition.
    ///
    /// Each batch is a `RecordBatch` containing the specified number of rows.
    ///
    /// # Arguments
    ///
    /// * `batches_per_partition` - Number of batches in each partition
    pub fn batches_per_partition(mut self, batches_per_partition: usize) -> Self {
        self.batches_per_partition = batches_per_partition;
        self
    }

    /// Sets the number of rows per batch.
    ///
    /// This determines the size of each `RecordBatch` that will be generated.
    ///
    /// # Arguments
    ///
    /// * `rows_per_batch` - Number of rows in each batch
    pub fn rows_per_batch(mut self, rows_per_batch: usize) -> Self {
        self.rows_per_batch = rows_per_batch;
        self
    }

    /// Sets the type of geometry to generate.
    ///
    /// Currently supports:
    /// - `GeometryTypeId::Point`: Random points within the specified bounds
    /// - `GeometryTypeId::Polygon`: Random diamond-shaped polygons
    /// - Other types default to point generation
    ///
    /// # Arguments
    ///
    /// * `geom_type` - The geometry type to generate
    pub fn geometry_type(mut self, geom_type: GeometryTypeId) -> Self {
        self.options.geom_type = geom_type;
        self
    }

    /// Sets the Sedona data type for the geometry column.
    ///
    /// This determines how the geometry data is stored (e.g., WKB or WKB View).
    ///
    /// # Arguments
    ///
    /// * `sedona_type` - The Sedona type for geometry storage
    pub fn sedona_type(mut self, sedona_type: SedonaType) -> Self {
        self.sedona_type = sedona_type;
        self
    }

    /// Sets the spatial bounds for geometry generation.
    ///
    /// All generated geometries will be positioned within these bounds.
    /// For polygons, the bounds are used to ensure the entire polygon fits within the area.
    ///
    /// # Arguments
    ///
    /// * `bounds` - Rectangle defining the spatial bounds (min_x, min_y, max_x, max_y)
    pub fn bounds(mut self, bounds: Rect) -> Self {
        self.options.bounds = bounds;
        self
    }

    /// Sets the size range for generated geometries.
    ///
    /// For polygons, this controls the radius of the generated shapes.
    /// For points, this parameter is not used.
    ///
    /// # Arguments
    ///
    /// * `size_range` - Tuple of (min_size, max_size) for geometry dimensions
    pub fn size_range(mut self, size_range: (f64, f64)) -> Self {
        self.options.size_range = size_range;
        self
    }

    /// Sets the rate of null values in the geometry column.
    ///
    /// # Arguments
    ///
    /// * `null_rate` - Fraction of rows that should have null geometry (0.0 to 1.0)
    pub fn null_rate(mut self, null_rate: f64) -> Self {
        self.null_rate = null_rate;
        self
    }

    /// Sets the rate of EMPTY geometries in the geometry column.
    ///
    /// # Arguments
    ///
    /// * `empty_rate` - Fraction of rows that should have empty geometry (0.0 to 1.0)
    pub fn empty_rate(mut self, empty_rate: f64) -> Self {
        self.options.empty_rate = empty_rate;
        self
    }

    /// Sets the vertex count range
    ///
    /// # Arguments
    ///
    /// * `vertices_per_linestring_range` - The minimum and maximum (inclusive) number of vertices
    ///   in linestring output. This also affects polygon output, although the actual number
    ///   of vertices in the polygon ring will be one more than the range indicated here to
    ///   close the polygon.
    pub fn vertices_per_linestring_range(
        mut self,
        vertices_per_linestring_range: (usize, usize),
    ) -> Self {
        self.options.vertices_per_linestring_range = vertices_per_linestring_range;
        self
    }

    /// Sets the number of parts range
    ///
    /// # Arguments
    ///
    /// * `num_parts_range` - The minimum and maximum (inclusive) number of parts
    ///   in multi geometry and/or collection output.
    pub fn num_parts_range(mut self, num_parts_range: (usize, usize)) -> Self {
        self.options.num_parts_range = num_parts_range;
        self
    }

    /// Sets the polygon hole rate
    ///
    /// # Arguments
    ///
    /// * `polygon_hole_rate` - Fraction of polygons that should have an interior
    ///   ring. Currently only a single interior ring is possible.
    pub fn polygon_hole_rate(mut self, polygon_hole_rate: f64) -> Self {
        self.options.polygon_hole_rate = polygon_hole_rate;
        self
    }

    /// The [SchemaRef] generated by this builder
    ///
    /// The resulting schema contains three columns:
    ///
    /// - `id`: Int32 - Unique sequential identifier for each row
    /// - `dist`: Float64 - Random distance value between 0.0 and 100.0
    /// - `geometry`: SedonaType - Random geometry data (WKB or WKB View format)
    pub fn schema(&self) -> SchemaRef {
        // Create schema
        Arc::new(Schema::new(vec![
            Field::new("id", DataType::Int32, false),
            Field::new("dist", DataType::Float64, false),
            self.sedona_type.to_storage_field("geometry", true).unwrap(),
        ]))
    }

    /// Builds the random partitioned dataset with the configured parameters.
    ///
    /// Generates a deterministic dataset based on the seed and configuration.
    /// The resulting schema contains three columns:
    /// - `id`: Int32 - Unique sequential identifier for each row
    /// - `dist`: Float64 - Random distance value between 0.0 and 100.0
    /// - `geometry`: SedonaType - Random geometry data (WKB or WKB View format)
    ///
    /// # Returns
    ///
    /// A tuple containing:
    /// - `SchemaRef`: Arrow schema for the generated data
    /// - `Vec<Vec<RecordBatch>>`: Vector of partitions, each containing a vector of record batches
    ///
    /// # Errors
    ///
    /// Returns a `datafusion_common::Result` error if:
    /// - RecordBatch creation fails
    /// - Array conversion fails
    /// - Schema creation fails
    pub fn build(&self) -> Result<(SchemaRef, Vec<Vec<RecordBatch>>)> {
        // Create a seeded random number generator for deterministic results
        let schema = self.schema();
        let mut result = Vec::with_capacity(self.num_partitions);

        for partition_idx in 0..self.num_partitions {
            let rng = Self::default_rng(self.seed + partition_idx as u64);
            let partition_batches = self
                .partition_reader(rng, partition_idx)
                .collect::<Result<Vec<_>, ArrowError>>()?;
            result.push(partition_batches);
        }

        Ok((schema, result))
    }

    /// Generate a [Rng] based on a seed
    ///
    /// Callers can also supply their own [Rng].
    pub fn default_rng(seed: u64) -> impl Rng {
        StdRng::seed_from_u64(seed)
    }

    /// Create a [RecordBatchReader] that reads a single partition
    pub fn partition_reader<R: Rng + Send + 'static>(
        &self,
        rng: R,
        partition_idx: usize,
    ) -> Box<dyn RecordBatchReader + Send> {
        let reader = RandomPartitionedDataReader {
            builder: self.clone(),
            schema: self.schema(),
            partition_idx,
            batch_idx: 0,
            rng,
        };

        Box::new(reader)
    }

    /// Generate a single batch
    fn generate_batch<R: Rng>(
        &self,
        rng: &mut R,
        schema: &SchemaRef,
        partition_idx: usize,
        batch_idx: usize,
    ) -> Result<RecordBatch> {
        // Generate IDs - make them unique across partitions and batches
        let id_start =
            (partition_idx * self.batches_per_partition + batch_idx) * self.rows_per_batch;
        let ids: Vec<i32> = (0..self.rows_per_batch)
            .map(|i| (id_start + i) as i32)
            .collect();

        // Generate random distances between 0.0 and 100.0
        let distance_dist = Uniform::new(0.0, 100.0);
        let distances: Vec<f64> = (0..self.rows_per_batch)
            .map(|_| rng.sample(distance_dist))
            .collect();

        // Generate random geometries based on the geometry type
        let wkb_geometries: Vec<Option<Vec<u8>>> = (0..self.rows_per_batch)
            .map(|_| {
                if rng.sample(Uniform::new(0.0, 1.0)) < self.null_rate {
                    None
                } else {
                    Some(generate_random_wkb(rng, &self.options))
                }
            })
            .collect();

        // Create Arrow arrays
        let id_array = Arc::new(Int32Array::from(ids));
        let dist_array = Arc::new(Float64Array::from(distances));
        let geometry_array = create_wkb_array(wkb_geometries, &self.sedona_type)?;

        // Create RecordBatch
        Ok(RecordBatch::try_new(
            schema.clone(),
            vec![id_array, dist_array, geometry_array],
        )?)
    }
}

/// Create an ArrayRef from a vector of WKB bytes based on the sedona type
fn create_wkb_array(
    wkb_values: Vec<Option<Vec<u8>>>,
    sedona_type: &SedonaType,
) -> Result<ArrayRef> {
    match sedona_type {
        SedonaType::Wkb(_, _) => Ok(Arc::new(BinaryArray::from_iter(wkb_values))),
        SedonaType::WkbView(_, _) => Ok(Arc::new(BinaryViewArray::from_iter(wkb_values))),
        _ => sedona_internal_err!("create_wkb_array not implemented for {sedona_type:?}"),
    }
}

struct RandomPartitionedDataReader<R> {
    builder: RandomPartitionedDataBuilder,
    schema: SchemaRef,
    partition_idx: usize,
    batch_idx: usize,
    rng: R,
}

impl<R: Rng> RecordBatchReader for RandomPartitionedDataReader<R> {
    fn schema(&self) -> SchemaRef {
        self.builder.schema()
    }
}

impl<R: Rng> Iterator for RandomPartitionedDataReader<R> {
    type Item = std::result::Result<RecordBatch, ArrowError>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.batch_idx == self.builder.batches_per_partition {
            return None;
        }

        let maybe_batch = self
            .builder
            .generate_batch(
                &mut self.rng,
                &self.schema,
                self.partition_idx,
                self.batch_idx,
            )
            .map_err(|e| ArrowError::ExternalError(Box::new(e)));
        self.batch_idx += 1;
        Some(maybe_batch)
    }
}

/// Options for the current strategy influencing individual geometry constructors
#[derive(Debug, Clone)]
struct RandomGeometryOptions {
    geom_type: GeometryTypeId,
    bounds: Rect,
    size_range: (f64, f64),
    vertices_per_linestring_range: (usize, usize),
    empty_rate: f64,
    polygon_hole_rate: f64,
    num_parts_range: (usize, usize),
}

impl RandomGeometryOptions {
    fn new() -> Self {
        Self {
            geom_type: GeometryTypeId::Point,
            empty_rate: 0.0,
            bounds: Rect::new(Coord { x: 0.0, y: 0.0 }, Coord { x: 100.0, y: 100.0 }),
            size_range: (1.0, 10.0),
            vertices_per_linestring_range: (4, 4),
            polygon_hole_rate: 0.0,
            num_parts_range: (1, 3),
        }
    }
}

impl Default for RandomGeometryOptions {
    fn default() -> Self {
        Self::new()
    }
}

/// Generate random geometry WKB bytes based on the geometry type
fn generate_random_wkb<R: rand::Rng>(rng: &mut R, options: &RandomGeometryOptions) -> Vec<u8> {
    let geometry = generate_random_geometry(rng, options);

    // Convert geometry to WKB
    let mut out: Vec<u8> = vec![];
    wkb::writer::write_geometry(&mut out, &geometry, Default::default()).unwrap();
    out
}

fn generate_random_geometry<R: rand::Rng>(
    rng: &mut R,
    options: &RandomGeometryOptions,
) -> Geometry {
    match options.geom_type {
        GeometryTypeId::Point => Geometry::Point(generate_random_point(rng, options)),
        GeometryTypeId::LineString => {
            Geometry::LineString(generate_random_linestring(rng, options))
        }
        GeometryTypeId::Polygon => Geometry::Polygon(generate_random_polygon(rng, options)),
        GeometryTypeId::MultiPoint => {
            Geometry::MultiPoint(generate_random_multipoint(rng, options))
        }
        GeometryTypeId::MultiLineString => {
            Geometry::MultiLineString(generate_random_multilinestring(rng, options))
        }
        GeometryTypeId::MultiPolygon => {
            Geometry::MultiPolygon(generate_random_multipolygon(rng, options))
        }
        GeometryTypeId::GeometryCollection => {
            Geometry::GeometryCollection(generate_random_geometrycollection(rng, options))
        }
        GeometryTypeId::Geometry => {
            let mut copy_options = options.clone();
            copy_options.geom_type = pick_random_geometry_type(rng);
            generate_random_geometry(rng, &copy_options)
        }
    }
}

fn generate_random_point<R: rand::Rng>(rng: &mut R, options: &RandomGeometryOptions) -> Point {
    if rng.gen_bool(options.empty_rate) {
        // This is a bit of a hack because geo-types doesn't support empty point; however,
        // this does work with respect to sending this directly to the WKB reader and getting
        // the WKB result we want
        Point::new(f64::NAN, f64::NAN)
    } else {
        // Generate random points within the specified bounds
        let x_dist = Uniform::new(options.bounds.min().x, options.bounds.max().x);
        let y_dist = Uniform::new(options.bounds.min().y, options.bounds.max().y);
        let x = rng.sample(x_dist);
        let y = rng.sample(y_dist);
        Point::new(x, y)
    }
}

fn generate_random_linestring<R: rand::Rng>(
    rng: &mut R,
    options: &RandomGeometryOptions,
) -> LineString {
    if rng.gen_bool(options.empty_rate) {
        LineString::new(vec![])
    } else {
        let (center_x, center_y, half_size) = generate_random_circle(rng, options);
        let vertices_dist = Uniform::new_inclusive(
            options.vertices_per_linestring_range.0,
            options.vertices_per_linestring_range.1,
        );
        // Always sample in such a way that we end up with a valid linestring
        let num_vertices = rng.sample(vertices_dist).max(2);
        let coords =
            generate_circular_vertices(rng, center_x, center_y, half_size, num_vertices, false);
        LineString::from(coords)
    }
}

fn generate_random_polygon<R: rand::Rng>(rng: &mut R, options: &RandomGeometryOptions) -> Polygon {
    if rng.gen_bool(options.empty_rate) {
        Polygon::new(LineString::new(vec![]), vec![])
    } else {
        let (center_x, center_y, half_size) = generate_random_circle(rng, options);
        let vertices_dist = Uniform::new_inclusive(
            options.vertices_per_linestring_range.0,
            options.vertices_per_linestring_range.1,
        );
        // Always sample in such a way that we end up with a valid Polygon
        let num_vertices = rng.sample(vertices_dist).max(3);
        let coords =
            generate_circular_vertices(rng, center_x, center_y, half_size, num_vertices, true);
        let shell = LineString::from(coords);
        let mut holes = Vec::new();

        // Potentially add a hole based on probability
        let add_hole = rng.gen_bool(options.polygon_hole_rate);
        let hole_scale_factor_dist = Uniform::new(0.1, 0.5);
        let hole_scale_factor = rng.sample(hole_scale_factor_dist);
        if add_hole {
            let new_size = half_size * hole_scale_factor;
            let mut coords =
                generate_circular_vertices(rng, center_x, center_y, new_size, num_vertices, true);
            coords.reverse();
            holes.push(LineString::from(coords));
        }

        Polygon::new(shell, holes)
    }
}

fn generate_random_multipoint<R: rand::Rng>(
    rng: &mut R,
    options: &RandomGeometryOptions,
) -> MultiPoint {
    if rng.gen_bool(options.empty_rate) {
        MultiPoint::new(vec![])
    } else {
        let children = generate_random_children(rng, options, generate_random_point);
        MultiPoint::new(children)
    }
}

fn generate_random_multilinestring<R: rand::Rng>(
    rng: &mut R,
    options: &RandomGeometryOptions,
) -> MultiLineString {
    if rng.gen_bool(options.empty_rate) {
        MultiLineString::new(vec![])
    } else {
        let children = generate_random_children(rng, options, generate_random_linestring);
        MultiLineString::new(children)
    }
}

fn generate_random_multipolygon<R: rand::Rng>(
    rng: &mut R,
    options: &RandomGeometryOptions,
) -> MultiPolygon {
    if rng.gen_bool(options.empty_rate) {
        MultiPolygon::new(vec![])
    } else {
        let children = generate_random_children(rng, options, generate_random_polygon);
        MultiPolygon::new(children)
    }
}

fn generate_random_geometrycollection<R: rand::Rng>(
    rng: &mut R,
    options: &RandomGeometryOptions,
) -> GeometryCollection {
    if rng.gen_bool(options.empty_rate) {
        GeometryCollection::new_from(vec![])
    } else {
        let children = generate_random_children(rng, options, generate_random_geometry);
        GeometryCollection::new_from(children)
    }
}

fn generate_random_children<R: Rng, T, F: Fn(&mut R, &RandomGeometryOptions) -> T>(
    rng: &mut R,
    options: &RandomGeometryOptions,
    func: F,
) -> Vec<T> {
    let num_parts_dist =
        Uniform::new_inclusive(options.num_parts_range.0, options.num_parts_range.1);
    let num_parts = rng.sample(num_parts_dist);

    // Constrain this feature to the size range indicated in the option
    let (center_x, center_y, half_width) = generate_random_circle(rng, options);
    let feature_bounds = Rect::new(
        Coord {
            x: center_x - half_width,
            y: center_y - half_width,
        },
        Coord {
            x: center_x + half_width,
            y: center_y + half_width,
        },
    );

    let child_bounds = generate_non_overlapping_sub_rectangles(num_parts, &feature_bounds);
    let mut child_options = options.clone();
    child_options.empty_rate = 0.0;

    let mut children = Vec::new();
    for bounds in child_bounds {
        child_options.bounds = bounds;
        let child_size = bounds.height().min(bounds.width());
        child_options.size_range = (child_size * 0.9, child_size);

        // If GeometryCollection, pick a random geometry type
        // Don't support nested GeometryCollection for now to avoid too much recursion
        if options.geom_type == GeometryTypeId::GeometryCollection {
            child_options.geom_type = pick_random_geometry_type(rng);
        }
        children.push(func(rng, &child_options));
    }

    children
}

fn pick_random_geometry_type<R: Rng>(rng: &mut R) -> GeometryTypeId {
    [
        GeometryTypeId::Point,
        GeometryTypeId::LineString,
        GeometryTypeId::Polygon,
        GeometryTypeId::MultiPoint,
        GeometryTypeId::MultiLineString,
        GeometryTypeId::MultiPolygon,
    ][rng.gen_range(0..6)]
}

fn generate_random_circle<R: rand::Rng>(
    rng: &mut R,
    options: &RandomGeometryOptions,
) -> (f64, f64, f64) {
    // Generate random diamond polygons (rotated squares)
    let size_dist = Uniform::new(options.size_range.0, options.size_range.1);
    let half_size = rng.sample(size_dist) / 2.0;

    // Ensure diamond fits within bounds by constraining center position
    let center_x_dist = Uniform::new(
        options.bounds.min().x + half_size,
        options.bounds.max().x - half_size,
    );
    let center_y_dist = Uniform::new(
        options.bounds.min().y + half_size,
        options.bounds.max().y - half_size,
    );
    let center_x = rng.sample(center_x_dist);
    let center_y = rng.sample(center_y_dist);

    (center_x, center_y, half_size)
}

fn generate_non_overlapping_sub_rectangles(num_parts: usize, bounds: &Rect) -> Vec<Rect> {
    let mut tiles = vec![*bounds];
    let mut n = 0;
    while tiles.len() < num_parts {
        // Find the largest rectangle
        let (largest_idx, _) = tiles
            .iter()
            .enumerate()
            .map(|(i, rect)| (i, rect.height() * rect.width()))
            .max_by(|(_, a1), (_, a2)| a1.partial_cmp(a2).unwrap())
            .unwrap_or((0, 0.0));

        // Mix up subdividing by x and y
        let new_rects = if (n % 2) == 0 {
            tiles[largest_idx].split_x()
        } else {
            tiles[largest_idx].split_y()
        };

        // Remove the largest rectangle and add its subdivisions
        tiles.remove(largest_idx);
        tiles.insert(largest_idx, new_rects[0]);
        tiles.insert(largest_idx, new_rects[1]);
        n += 1;
    }

    tiles
}

fn generate_circular_vertices<R: rand::Rng>(
    rng: &mut R,
    center_x: f64,
    center_y: f64,
    radius: f64,
    num_vertices: usize,
    closed: bool,
) -> Vec<Coord> {
    let mut out = Vec::new();

    // Randomize starting angle (0 to 2 * PI)
    let start_angle_dist = Uniform::new(0.0, 2.0 * PI);
    let mut angle: f64 = rng.sample(start_angle_dist);

    let dangle = 2.0 * PI / (num_vertices as f64).max(3.0);
    for _ in 0..num_vertices {
        out.push(Coord {
            x: angle.cos() * radius + center_x,
            y: angle.sin() * radius + center_y,
        });
        angle += dangle;
    }

    if closed {
        out.push(out[0]);
    }

    out
}

#[cfg(test)]
mod tests {
    use super::*;
    use arrow_schema::DataType;
    use geo_traits::{MultiLineStringTrait, MultiPolygonTrait};
    use geo_types::Coord;
    use rand::rngs::StdRng;
    use rand::SeedableRng;
    use rstest::rstest;
    use sedona_geometry::{
        analyze::analyze_geometry, bounds::wkb_bounds_xy, interval::IntervalTrait,
    };

    #[test]
    fn test_generate_random_geometry_produces_valid_wkb() {
        let bounds = Rect::new(Coord { x: 10.0, y: 10.0 }, Coord { x: 90.0, y: 90.0 });
        let size_range = (1.0, 10.0);

        // Test both Point and Polygon geometry types
        let test_cases = vec![
            (GeometryTypeId::Point, 42, 100, 20, 50), // (type, seed, iterations, min_size, max_size)
            (GeometryTypeId::Polygon, 123, 50, 80, 200),
        ];

        for (geom_type, seed, iterations, min_size, max_size) in test_cases {
            let mut rng = StdRng::seed_from_u64(seed);
            let options = RandomGeometryOptions {
                geom_type,
                bounds,
                size_range,
                ..Default::default()
            };

            for _ in 0..iterations {
                let wkb_bytes = generate_random_wkb(&mut rng, &options);

                // Verify WKB is not empty and has reasonable size
                assert!(!wkb_bytes.is_empty());
                assert!(
                    wkb_bytes.len() >= min_size,
                    "WKB size {} is smaller than expected minimum {} for {:?}",
                    wkb_bytes.len(),
                    min_size,
                    geom_type
                );
                assert!(
                    wkb_bytes.len() <= max_size,
                    "WKB size {} is larger than expected maximum {} for {:?}",
                    wkb_bytes.len(),
                    max_size,
                    geom_type
                );

                // Verify WKB can be parsed without error
                wkb::reader::read_wkb(&wkb_bytes).unwrap();
            }
        }
    }

    #[test]
    fn test_generate_random_geometry_deterministic() {
        let bounds = Rect::new(Coord { x: 0.0, y: 0.0 }, Coord { x: 100.0, y: 100.0 });
        let size_range = (1.0, 10.0);

        let geom_types = [GeometryTypeId::Point, GeometryTypeId::Polygon];

        // Generate with same seed twice
        let mut rng1 = StdRng::seed_from_u64(42);
        let mut rng2 = StdRng::seed_from_u64(42);

        for geom_type in geom_types {
            let options = RandomGeometryOptions {
                geom_type,
                bounds,
                size_range,
                ..Default::default()
            };
            let wkb1 = generate_random_wkb(&mut rng1, &options);
            let wkb2 = generate_random_wkb(&mut rng2, &options);

            // Should generate identical results
            assert_eq!(wkb1, wkb2);
        }
    }

    #[test]
    fn test_random_partitioned_data_builder_build_basic() {
        let (schema, partitions) = RandomPartitionedDataBuilder::new()
            .num_partitions(2)
            .batches_per_partition(3)
            .rows_per_batch(4)
            .null_rate(0.0) // No nulls for easier testing
            .build()
            .unwrap();

        // Verify schema
        assert_eq!(schema.fields().len(), 3);
        assert_eq!(schema.field(0).name(), "id");
        assert_eq!(schema.field(0).data_type(), &DataType::Int32);
        assert_eq!(schema.field(1).name(), "dist");
        assert_eq!(schema.field(1).data_type(), &DataType::Float64);
        assert_eq!(schema.field(2).name(), "geometry");

        // Verify partitions structure
        assert_eq!(partitions.len(), 2); // num_partitions

        for partition in &partitions {
            assert_eq!(partition.len(), 3); // batches_per_partition

            for batch in partition {
                assert_eq!(batch.num_rows(), 4); // rows_per_batch
                assert_eq!(batch.num_columns(), 3);
            }
        }
    }

    #[test]
    fn test_random_partitioned_data_builder_unique_ids() {
        let (_, partitions) = RandomPartitionedDataBuilder::new()
            .num_partitions(2)
            .batches_per_partition(2)
            .rows_per_batch(3)
            .build()
            .unwrap();

        let mut all_ids = Vec::new();

        for partition in &partitions {
            for batch in partition {
                let id_array = batch
                    .column(0)
                    .as_any()
                    .downcast_ref::<Int32Array>()
                    .unwrap();
                for i in 0..id_array.len() {
                    all_ids.push(id_array.value(i));
                }
            }
        }

        // Verify all IDs are unique
        all_ids.sort();
        for i in 1..all_ids.len() {
            assert_ne!(
                all_ids[i - 1],
                all_ids[i],
                "Found duplicate ID: {}",
                all_ids[i]
            );
        }

        // Verify IDs are sequential starting from 0
        for (i, &id) in all_ids.iter().enumerate() {
            assert_eq!(id, i as i32);
        }
    }

    #[test]
    fn test_random_partitioned_data_builder_null_rate() {
        let (_, partitions) = RandomPartitionedDataBuilder::new()
            .rows_per_batch(100)
            .null_rate(0.5) // 50% null rate
            .build()
            .unwrap();

        let batch = &partitions[0][0];
        let geometry_array = batch.column(2);

        let null_count = geometry_array.null_count();
        let total_count = geometry_array.len();
        let null_rate = null_count as f64 / total_count as f64;

        // Allow some variance due to randomness (±20%)
        assert!(
            (0.3..=0.7).contains(&null_rate),
            "Expected null rate around 0.5, got {null_rate}"
        );
    }

    #[test]
    fn test_random_partitioned_data_builder_deterministic() {
        let bounds = Rect::new(Coord { x: 0.0, y: 0.0 }, Coord { x: 100.0, y: 100.0 });

        let (schema1, partitions1) = RandomPartitionedDataBuilder::new()
            .seed(999)
            .num_partitions(2)
            .batches_per_partition(2)
            .rows_per_batch(5)
            .bounds(bounds)
            .build()
            .unwrap();

        let (schema2, partitions2) = RandomPartitionedDataBuilder::new()
            .seed(999) // Same seed
            .num_partitions(2)
            .batches_per_partition(2)
            .rows_per_batch(5)
            .bounds(bounds)
            .build()
            .unwrap();

        // Schemas should be identical
        assert_eq!(schema1, schema2);

        // All data should be identical
        assert_eq!(partitions1.len(), partitions2.len());
        for (partition1, partition2) in partitions1.iter().zip(partitions2.iter()) {
            assert_eq!(partition1.len(), partition2.len());
            for (batch1, batch2) in partition1.iter().zip(partition2.iter()) {
                // Compare IDs
                let ids1 = batch1
                    .column(0)
                    .as_any()
                    .downcast_ref::<Int32Array>()
                    .unwrap();
                let ids2 = batch2
                    .column(0)
                    .as_any()
                    .downcast_ref::<Int32Array>()
                    .unwrap();
                assert_eq!(ids1, ids2);

                // Compare distances
                let dists1 = batch1
                    .column(1)
                    .as_any()
                    .downcast_ref::<Float64Array>()
                    .unwrap();
                let dists2 = batch2
                    .column(1)
                    .as_any()
                    .downcast_ref::<Float64Array>()
                    .unwrap();
                assert_eq!(dists1, dists2);
            }
        }
    }

    #[test]
    fn test_random_partitioned_data_builder_different_seeds() {
        let bounds = Rect::new(Coord { x: 0.0, y: 0.0 }, Coord { x: 100.0, y: 100.0 });

        let (_, partitions1) = RandomPartitionedDataBuilder::new()
            .seed(111)
            .rows_per_batch(10)
            .bounds(bounds)
            .build()
            .unwrap();

        let (_, partitions2) = RandomPartitionedDataBuilder::new()
            .seed(222) // Different seed
            .rows_per_batch(10)
            .bounds(bounds)
            .build()
            .unwrap();

        // Data should be different (distances should differ)
        let dists1 = partitions1[0][0]
            .column(1)
            .as_any()
            .downcast_ref::<Float64Array>()
            .unwrap();
        let dists2 = partitions2[0][0]
            .column(1)
            .as_any()
            .downcast_ref::<Float64Array>()
            .unwrap();

        // At least some distances should be different
        let mut found_difference = false;
        for i in 0..dists1.len() {
            if (dists1.value(i) - dists2.value(i)).abs() > f64::EPSILON {
                found_difference = true;
                break;
            }
        }
        assert!(
            found_difference,
            "Expected different random data with different seeds"
        );
    }

    #[test]
    fn test_random_linestring_num_vertices() {
        let mut rng = StdRng::seed_from_u64(123);
        let mut options = RandomGeometryOptions::new();
        options.vertices_per_linestring_range = (3, 3);
        for _ in 0..100 {
            let geom = generate_random_linestring(&mut rng, &options);
            assert_eq!(geom.coords().count(), 3);
        }

        options.vertices_per_linestring_range = (50, 50);
        for _ in 0..100 {
            let geom = generate_random_linestring(&mut rng, &options);
            assert_eq!(geom.coords().count(), 50);
        }
    }

    #[test]
    fn test_random_polygon_has_hole() {
        let mut rng = StdRng::seed_from_u64(123);
        let mut options = RandomGeometryOptions::new();

        options.polygon_hole_rate = 0.0;
        for _ in 0..100 {
            let geom = generate_random_polygon(&mut rng, &options);
            assert_eq!(geom.interiors().len(), 0);
        }

        options.polygon_hole_rate = 1.0;
        for _ in 0..100 {
            let geom = generate_random_polygon(&mut rng, &options);
            assert!(!geom.interiors().is_empty());
        }
    }

    #[test]
    fn test_random_multipoint_part_count() {
        let mut rng = StdRng::seed_from_u64(123);
        let mut options = RandomGeometryOptions::new();

        options.num_parts_range = (3, 3);
        for _ in 0..100 {
            let geom = generate_random_multipoint(&mut rng, &options);
            assert_eq!(geom.len(), 3);
        }

        options.num_parts_range = (10, 10);
        for _ in 0..100 {
            let geom = generate_random_multipoint(&mut rng, &options);
            assert_eq!(geom.len(), 10);
        }
    }

    #[test]
    fn test_random_multilinestring_part_count() {
        let mut rng = StdRng::seed_from_u64(123);
        let mut options = RandomGeometryOptions::new();

        options.num_parts_range = (3, 3);
        for _ in 0..100 {
            let geom = generate_random_multilinestring(&mut rng, &options);
            assert_eq!(geom.num_line_strings(), 3);
        }

        options.num_parts_range = (10, 10);
        for _ in 0..100 {
            let geom = generate_random_multilinestring(&mut rng, &options);
            assert_eq!(geom.num_line_strings(), 10);
        }
    }

    #[test]
    fn test_random_multipolygon_part_count() {
        let mut rng = StdRng::seed_from_u64(123);
        let mut options = RandomGeometryOptions::new();

        options.num_parts_range = (3, 3);
        for _ in 0..100 {
            let geom = generate_random_multipolygon(&mut rng, &options);
            assert_eq!(geom.num_polygons(), 3);
        }

        options.num_parts_range = (10, 10);
        for _ in 0..100 {
            let geom = generate_random_multipolygon(&mut rng, &options);
            assert_eq!(geom.num_polygons(), 10);
        }
    }

    #[test]
    fn test_random_geometrycollection_part_count() {
        let mut rng = StdRng::seed_from_u64(123);
        let mut options = RandomGeometryOptions::new();

        options.num_parts_range = (3, 3);
        for _ in 0..100 {
            let geom = generate_random_geometrycollection(&mut rng, &options);
            assert_eq!(geom.len(), 3);
        }

        options.num_parts_range = (10, 10);
        for _ in 0..100 {
            let geom = generate_random_geometrycollection(&mut rng, &options);
            assert_eq!(geom.len(), 10);
        }
    }

    #[rstest]
    fn test_random_geometry_type(
        #[values(
            GeometryTypeId::Point,
            GeometryTypeId::LineString,
            GeometryTypeId::Polygon,
            GeometryTypeId::MultiPoint,
            GeometryTypeId::MultiLineString,
            GeometryTypeId::MultiPolygon,
            GeometryTypeId::GeometryCollection
        )]
        geom_type: GeometryTypeId,
    ) {
        let mut rng = StdRng::seed_from_u64(123);
        let mut options = RandomGeometryOptions::new();
        options.geom_type = geom_type;

        options.empty_rate = 0.0;
        for _ in 0..100 {
            let geom = generate_random_wkb(&mut rng, &options);
            let wkb = wkb::reader::read_wkb(&geom).unwrap();
            let analysis = analyze_geometry(&wkb).unwrap();
            assert_eq!(analysis.geometry_type.geometry_type(), geom_type);
        }
    }

    #[rstest]
    fn test_random_emptiness(
        #[values(
            GeometryTypeId::Point,
            GeometryTypeId::LineString,
            GeometryTypeId::Polygon,
            GeometryTypeId::MultiPoint,
            GeometryTypeId::MultiLineString,
            GeometryTypeId::MultiPolygon,
            GeometryTypeId::GeometryCollection
        )]
        geom_type: GeometryTypeId,
    ) {
        let mut rng = StdRng::seed_from_u64(123);
        let mut options = RandomGeometryOptions::new();
        options.geom_type = geom_type;

        options.empty_rate = 0.0;
        for _ in 0..100 {
            let geom = generate_random_wkb(&mut rng, &options);
            let bounds = wkb_bounds_xy(&geom).unwrap();
            assert!(!bounds.x().is_empty());
            assert!(!bounds.y().is_empty());

            assert!(
                bounds.x().lo() >= options.bounds.min().x
                    && bounds.y().lo() >= options.bounds.min().y
                    && bounds.x().hi() <= options.bounds.max().x
                    && bounds.y().hi() <= options.bounds.max().y
            );
        }

        options.empty_rate = 1.0;
        for _ in 0..100 {
            let geom = generate_random_wkb(&mut rng, &options);
            let bounds = wkb_bounds_xy(&geom).unwrap();
            assert!(bounds.x().is_empty());
            assert!(bounds.y().is_empty());
        }
    }
}
